package poolserver

import (
	"encoding/binary"
	"net"
	"sync/atomic"
	"time"

	collector "csudata.com/zqpool/src/collector"
	"go.uber.org/zap"
)

/*获得第一个消息包*/
func (ctx *ConnContext) recvFirstRequest() error {
	var pos int32
	var err error
	var n int
	var pktpos int
	var pktlen int

	ctx.msgCount = 0
	pos = 0
	pktpos = 0

	for pos < 5 {
		/*先收消息头*/
		n, err = ctx.cliConn.Read(ctx.recvBuf[pos:5])
		if err != nil {
			if err != nil {
				return err
			}
		}
		pos += int32(n)
		break
	}

	/*获得消息的长度*/
	pktlen = int(binary.BigEndian.Uint32(ctx.recvBuf[pktpos+1 : pktpos+5]))
	/*把整个消息整个收下来*/
	for pos < int32(pktpos+pktlen)+1 {
		/*收消息后续的内容*/
		n, err = ctx.cliConn.Read(ctx.recvBuf[pos:5])
		if err != nil {
			if err != nil {
				return err
			}
		}
		pos += int32(n)
		if pos > DEFAULT_BUF_SIZE-64 {
			zap.S().Panicf("Client(%d): request too long!", ctx.Pid)
		}
	}
	ctx.msgList[ctx.msgCount] = ctx.recvBuf[pktpos : pktpos+pktlen+1]
	return nil
}

func (ctx *ConnContext) recvRequest() error {
	var pos int32
	var err error
	var n int
	var pktpos int
	var pktlen int

	ctx.msgCount = 0
	pos = 0
	pktpos = 0

	for {
		//TCHDEBUG zap.S().Infof("Client(%d): RC: begin...", ctx.Pid)
		n, err = ctx.cliConn.Read(ctx.recvBuf[pos:ctx.recvBufSize])
		if err != nil {
			return err
		}

		pos += int32(n)
		if pos < int32(pktpos)+5 {
			continue
		}
		msgType := ctx.recvBuf[0]
		pktlen = int(binary.BigEndian.Uint32(ctx.recvBuf[pktpos+1 : pktpos+5]))
		if int32(pktpos+pktlen+1) > ctx.recvBufSize {
			/*缓冲区不够，扩展缓冲区*/
			newBufSize := pktpos + pktlen + 1 + 5 + int(DEFAULT_BUF_SIZE)
			//newBufSize := pktpos + pktlen + 1 + 5

			recvBuf := make([]byte, newBufSize)
			copy(recvBuf, ctx.recvBuf[:ctx.recvBufSize])
			zap.S().Infof("Client(%d): recv buffer from %d increase to %d", ctx.Pid, ctx.recvBufSize, newBufSize)
			ctx.recvBufSize = int32(newBufSize)
			ctx.recvBuf = recvBuf
		}

		if pos < int32(pktpos+pktlen)+1 {
			continue
		}

	nextmsg:
		ctx.msgList[ctx.msgCount] = ctx.recvBuf[pktpos : pktpos+pktlen+1]
		ctx.msgCount++
		/*到这里第一个肯定已经完全接收下来了*/
		if msgType != 'P' && msgType != 'B' && msgType != 'D' {
			/*非扩展消息，普通消息，接收到一个消息后就退出*/
			return nil
		}

		/*对于扩展消息需要收多个消息，直到S消息后才退出，否则一直接收*/
		if ctx.msgList[ctx.msgCount-1][0] == 'S' {
			return nil
		}
		pktpos += pktlen + 1
		if int32(pktpos+5) > ctx.recvBufSize {
			/*缓冲区不够，扩展缓冲区*/
			newBufSize := pktpos + 5 + int(DEFAULT_BUF_SIZE)
			//newBufSize := pktpos + 5

			recvBuf := make([]byte, newBufSize)
			copy(recvBuf, ctx.recvBuf[:ctx.recvBufSize])
			zap.S().Infof("Client(%d): recv buffer from %d increase to %d", ctx.Pid, ctx.recvBufSize, newBufSize)
			ctx.recvBufSize = int32(newBufSize)
			ctx.recvBuf = recvBuf
		}

		if int32(pktpos+5) > pos { /*需要重新接收数据*/
			continue
		}
		pktlen = int(binary.BigEndian.Uint32(ctx.recvBuf[pktpos+1 : pktpos+5]))
		if pktpos+pktlen+1 <= int(pos) { /*处理缓冲区中的另一个包*/
			goto nextmsg
		}

	}

	return nil
}

func (ctx *ConnContext) ProcessX() { /* Terminate消息*/
	/* 做一些清理工作，把此后端连接上的事务回滚掉*/
	if ctx.transState != 'I' {
		CleanupTrans(ctx.pBackConn.Conn)
	}
	if ctx.isGetBackend {
		zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
		atomic.StoreInt32(&ctx.pBackConn.State, 0)
	}
	ctx.cliConn.Close()
	zap.S().Infof("Client(%d): Close Connect", ctx.Pid)
}

func (ctx *ConnContext) ProcessQ() error { /* 这是 Query 简单查询 */
	var err error

	ctx.isInUnnamedPrepared = false

	if !ctx.isGetBackend {
		//zap.S().Infof("Get connect from pool.")
		ctx.pBackConn = getBackend(ctx.pool)
		zap.S().Debugf("Client(%d): get backend(%d)", ctx.Pid, ctx.pBackConn.Id)
		ctx.isGetBackend = true
		//TCHDEBUG zap.S().Infof("Client(%d, Q): hold backend(%d)", ctx.Pid, ctx.pBackConn.Id)
		//reConnCnt = ctx.pBackConn.ReConnCnt
	}
	/* 直接转发 */
	var errBeginTime int64 = 0
	for {
		zap.S().Debugf("Client(%d, Q): SB(%d): %v", ctx.Pid, ctx.pBackConn.Id, ctx.msgList[0])
		_, err = sendData(ctx.pBackConn.Conn, ctx.msgList[0])
		if err != nil {
			if errBeginTime == 0 {
				errBeginTime = time.Now().Unix()
			}
			zap.S().Infof("Client(%d, Q): SB(%d): ERROR: %s", ctx.Pid, ctx.pBackConn.Id, err.Error())
			if ctx.transState != 'I' { /* 如果是在事务中，则无法恢复 */
				ctx.cliConn.Close()
				ctx.BeReconnect()
				return err
			}
			if time.Now().Unix()-errBeginTime > 20 {
				zap.S().Infof("ERROR: wait 20 seconds, but the backend(%v) is still not recovered!", ctx.pool.Conf.PortalList)
				ctx.cliConn.Close()
				ctx.BeReconnect()
				return err
			}
			time.Sleep(50 * time.Millisecond)
			ctx.BeReconnect()
			continue
		}

		//if ctx.leftLen > 0 { /* 包的数据还没有收完， 把后续的数据直接转发到后端 */
		//	zap.S().Infof("Client(%d, Q): FB(%d): datalen=%d", ctx.Pid, ctx.pBackConn.Id, ctx.leftLen)
		//	err = ctx.transLeftDataToBackend()
		//	if err != nil {
		//		return err
		//	}
		//}

		break
	}
	ctx.pSendBackConn = ctx.pBackConn
	clientErr, backendErr := ctx.forwardToClient()
	if clientErr != nil {
		return clientErr
	} else {
		return backendErr
	}
}

/*
返回clientErr, backendErr
*/
func (ctx *ConnContext) forwardToClient() (error, error) {
	var n int
	var clientErr error = nil
	var backendErr error = nil
	var pos int
	var pktPos int
	var pktLen int
	var msgType byte
	var hdrPos int
	var copyLen int

	//TCHDEBUG zap.S().Infof("*****RETURN Backend(%d) => Client(%d)", ctx.pBackConn.Id, ctx.Pid)
	if ctx.pSendBackConn.Id != ctx.pBackConn.Id {
		zap.S().Panicf("recv backend not the send backend!!!")
	}

	pos = 0                   //记录缓冲区记录数据的点
	pktPos = 0                //记录包的开始位置
	pktHdr := make([]byte, 6) /*记录包的前6个字节*/
	hdrPos = 0
	for {
		//TCHDEBUG zap.S().Infof("Client(%d): RB(%d): begin ...", ctx.Pid, ctx.pBackConn.Id)
		n, backendErr = ctx.pBackConn.Conn.Read(ctx.recvBuf[pos:ctx.recvBufSize])
		if backendErr != nil {
			zap.S().Infof("Client(%d): RB(%d): ERROR: %s", ctx.Pid, ctx.pBackConn.Id, backendErr.Error())
			ctx.cliConn.Close()
			ctx.BeReconnect()

			zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
			atomic.StoreInt32(&ctx.pBackConn.State, 0)
			ctx.isGetBackend = false
			return nil, backendErr
		}
		pos += n

		/*把数据转发到client*/
		zap.S().Debugf("Client(%d, Q): RB(%d): %v", ctx.Pid, ctx.pBackConn.Id, ctx.recvBuf[0:pos])
		_, clientErr = sendData(ctx.cliConn, ctx.recvBuf[0:pos])
		if clientErr != nil {
			zap.S().Infof("Client(%d): SC: ERROR: %s", ctx.Pid, clientErr.Error())
			ctx.cliConn.Close()
			ctx.BeReconnect()

			zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
			atomic.StoreInt32(&ctx.pBackConn.State, 0)
			ctx.isGetBackend = false
			return clientErr, nil
		}

	retry:
		if pktPos < pos {
			if pos-pktPos >= 6-hdrPos {
				copyLen = 6 - hdrPos
			} else {
				copyLen = pos - pktPos
			}
			copy(pktHdr[hdrPos:], ctx.recvBuf[pktPos:pktPos+copyLen])
			hdrPos += copyLen
			if hdrPos >= 6 { /*填满了pktHdr*/
				pktLen = int(binary.BigEndian.Uint32(pktHdr[1:5]))
				if copyLen != 6 { /*本次拷贝的是部分字节(copyLen)，说明pktpos的位置是0之前的6-copyLen字符的位置*/
					pktPos = copyLen - 6 /*这是一个负数*/
				}
				pktPos += 1 + pktLen
				msgType = pktHdr[0]
				hdrPos = 0
			} else {
				pktPos += copyLen
			}
		}
		if pktPos >= pos {
			if pktPos == pos {
				// if msgType == 'Z' || msgType == 'E' { /*最后一个包是Z或E，说明接收结束了*/
				// 	if msgType == 'Z' {
				// 		ctx.transState = pktHdr[5]
				// 	}
				// 	goto success
				// }
				if msgType == 'Z' { /*最后一个包是Z，说明接收结束了*/
					ctx.transState = pktHdr[5]
					goto success
				}
			}
			pktPos -= pos
			pos = 0
			continue
		} else { /*这是buffer中有多个message的情况，处理下一个message*/
			goto retry
		}
	}

success:

	if !ctx.isInUnnamedPrepared {
		for prepareName, cuPre := range ctx.cupreMap {
			var pdata *PrepareData
			if cuPre.prepareRequestLen == 0 { /* 这是prepare的数据已缓存，而之前这个backend没有parse的情况*/
				var ok bool
				pdata, ok = ctx.cachedPrepare.Get(prepareName)
				if !ok {
					zap.S().Panicf("BUG!!!!!!")
				}
			} else {
				pdata = new(PrepareData)
				ctx.cachedPrepare.AddPrepare(prepareName, pdata)
				pdata.PrepareRequest = cuPre.prepareRequestData
			}

			pdata.BackendMap = make(map[int32]*PrepareInBackend)
			pib := new(PrepareInBackend)
			pib.PrepareId = cuPre.backendPrepareId
			pib.BackConn = ctx.pBackConn
			pib.ReConnCnt = ctx.pBackConn.ReConnCnt /* 把后端连接的当前重连次数记录下来，如果在以后，后端重连后，两者就不一样了，就知道后端被重连了*/
			pdata.BackendMap[ctx.pBackConn.Id] = pib
			zap.S().Infof("Client(%d): store prepare name=%s to backend(%d), backendPrepareId=%d",
				ctx.Pid, prepareName, ctx.pBackConn.Id, cuPre.backendPrepareId)
		}
		if len(ctx.cupreMap) > 1 {
			zap.S().Infof("many prepare in one request!")
		}
	}

	/*把临时放prepare data的map清空*/
	ctx.cupreMap = make(map[string]*CuPre)
	if ctx.transState == 'I' { /*释放后端连接*/
		zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
		atomic.StoreInt32(&ctx.pBackConn.State, 0)
		ctx.isGetBackend = false
		ctx.pBackConn = nil
	}
	return nil, nil
}

func (ctx *ConnContext) sendToBackend() error {
	var err error
	ctx.pSendBackConn = ctx.pBackConn
	//TCHDEBUG zap.S().Infof("Client(%d): SB(%d): data=%c%d%v",
	//TCHDEBUG 	ctx.Pid, ctx.pBackConn.Id, ctx.sendBuf[0], len(ctx.sendBuf[:ctx.sendLen]), ctx.sendBuf[:ctx.sendLen])
	_, err = sendData(ctx.pBackConn.Conn, ctx.sendBuf[:ctx.sendLen])
	if err != nil {
		zap.S().Infof("Client(%d): SB(%d, %c): ERROR : %s", ctx.Pid, ctx.pBackConn.Id, ctx.sendBuf[0], err.Error())
		ctx.cliConn.Close()
		zap.S().Infof("Client(%d): reconnect backend(%d)", ctx.Pid, ctx.pBackConn.Id)
		ctx.BeReconnect()
		if ctx.isGetBackend {
			zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
			atomic.StoreInt32(&ctx.pBackConn.State, 0)
			ctx.isGetBackend = false
		}
		return err
	}
	return nil
}

func (ctx *ConnContext) sendToBackendWithRetry() error {
	var err error
	ctx.pSendBackConn = ctx.pBackConn
	/* 把消息发到后端 */
	for {

		//TCHDEBUG zap.S().Infof("Client(%d): SB(%d): data=%c%d%v",
		//TCHDEBUG 	ctx.Pid, ctx.pBackConn.Id, ctx.sendBuf[0], ctx.sendLen, ctx.sendBuf[:ctx.sendLen])
		_, err = sendData(ctx.pBackConn.Conn, ctx.sendBuf[:ctx.sendLen])
		if err == nil {
			break
		}
		zap.S().Infof("Client(%d): SB(%d): ERROR: %s", ctx.Pid, ctx.pBackConn.Id, err.Error())
		if ctx.transState != 'I' { /* 如果是在事务中，则无法恢复 */
			zap.S().Infof("Client(%d): SB(%d): reconnect backend, close client", ctx.Pid, ctx.pBackConn.Id)
			ctx.cliConn.Close()
			ctx.BeReconnect()
			zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
			atomic.StoreInt32(&ctx.pBackConn.State, 0)
			return err
		}
		ctx.BeReconnect()
		//FIXME: 如果发到后端一直不成功，则变成死循环了。
	}
	return nil
}

func (ctx *ConnContext) handleError() {
	ctx.cliConn.Close()
	if ctx.transState != 'I' {
		CleanupTrans(ctx.pBackConn.Conn)
	}
	if ctx.isGetBackend {
		zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
		atomic.StoreInt32(&ctx.pBackConn.State, 0)
		ctx.isGetBackend = false
	}
}

func (ctx *ConnContext) ProcessC() error { /* 处理close消息 */
	var err error
	if ctx.recvBuf[5] != 'S' { /* 另一个可行的值为'P'，指示关闭portal ，但libpq不支持直接操作portal */
		var errFields = [...]string{
			"SFATAL",
			"VFATAL",
			"C0A000",
			"MOnly support close prepare stmt.",
			"Fwww.cstech.ltd",
			"L1480",
			"Rfeature_not_supported",
		}
		zap.S().Infof("Client(%d, C): SC: ERROR: L1480, feature_not_supported", ctx.Pid)
		err = sendPqErrorResponse(ctx.cliConn, errFields[:])
		if err != nil {
			zap.S().Infof("Reply to client error: %s", err.Error())
			ctx.cliConn.Close()
			if ctx.transState != 'I' {
				CleanupTrans(ctx.pBackConn.Conn)
			}
			if ctx.isGetBackend {
				zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
				atomic.StoreInt32(&ctx.pBackConn.State, 0)
				ctx.isGetBackend = false
			}
			return err
		}
		zap.S().Infof("Client(%d, C): SC: Ready for query", ctx.Pid)
		err = sendPqReadyForQuery(ctx.cliConn, ctx.transState)
		if err != nil {
			zap.S().Infof("Reply to client error: %s", err.Error())
			ctx.cliConn.Close()
			if ctx.transState != 'I' {
				CleanupTrans(ctx.pBackConn.Conn)
			}
			if ctx.isGetBackend {
				zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
				atomic.StoreInt32(&ctx.pBackConn.State, 0)
				ctx.isGetBackend = false
			}
			return err
		}
	}

	prepareName := string(ctx.msgList[0][6:])
	_, ok := ctx.cachedPrepare.Get(prepareName)
	if !ok { /* 发送了一个不存在的prepare */
		var errFields = [...]string{
			"SFATAL",
			"VFATAL",
			"C26000",
			"Mprepare not exists",
			"Fwww.cstech.ltd",
			"L1524",
			"Rinvalid_sql_statement_name",
		}
		//zap.S().Infof("Client(%d, C): SC: ERROR: L1524, invalid_sql_statement_name", ctx.Pid)
		zap.S().Panicf("Client(%d, C): SC: ERROR: L1524, invalid_sql_statement_name", ctx.Pid)
		err = sendPqErrorResponse(ctx.cliConn, errFields[:])
		if err != nil {
			zap.S().Infof("Client(%d, C): SC: ERROR: %s", ctx.Pid, err.Error())
			ctx.cliConn.Close()
			if ctx.transState != 'I' {
				CleanupTrans(ctx.pBackConn.Conn)
			}
			if ctx.isGetBackend {
				zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
				atomic.StoreInt32(&ctx.pBackConn.State, 0)
				ctx.isGetBackend = false
			}
			return err
		}
		sendPqReadyForQuery(ctx.cliConn, ctx.transState)
	}
	ctx.cachedPrepare.DeletePrepare(prepareName)
	zap.S().Infof("Client(%d, C): SC: close client", ctx.Pid)
	err = sendPqCloseCompletion(ctx.cliConn)
	if err != nil {
		zap.S().Infof("Client(%d, C): SC: ERROR: %s", ctx.Pid, err.Error())
		ctx.cliConn.Close()
		if ctx.transState != 'I' {
			CleanupTrans(ctx.pBackConn.Conn)
		}
		if ctx.isGetBackend {
			zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
			atomic.StoreInt32(&ctx.pBackConn.State, 0)
			ctx.isGetBackend = false
		}
		return err
	}
	zap.S().Infof("Client(%d, C): SC: Ready for query", ctx.Pid)
	err = sendPqReadyForQuery(ctx.cliConn, ctx.transState)
	if err != nil {
		zap.S().Infof("Client(%d, C): SC: ERROR: %s", ctx.Pid, err.Error())
		ctx.cliConn.Close()
		if ctx.transState != 'I' {
			zap.S().Infof("Client(%d, B): Cleanup backend(%d) transaction", ctx.Pid, ctx.pBackConn.Id)
			CleanupTrans(ctx.pBackConn.Conn)
		}
		if ctx.isGetBackend {
			zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
			atomic.StoreInt32(&ctx.pBackConn.State, 0)
			ctx.isGetBackend = false
		}
		return err
	}
	return nil
}

func handleConnection(cliConn net.Conn) {
	var ctx ConnContext
	var err error
	var ret int
	var pool *Pool

	ret, pool = handleAuth(cliConn, ctx.Pid)
	if ret != 0 {
		cliConn.Close()
		return
	}

	ctx.Init(cliConn, pool.Conf.MsgBufSize)
	ctx.pool = pool
	ctx.cachedPrepare.Init()

	collector.FrontendConnectionsIncrease(pool.Conf.ID)
	defer collector.FrontendConnectionsDecrease(pool.Conf.ID)
	defer ctx.cachedPrepare.Discard()

	for {
		err = ctx.recvRequest()
		if err != nil {
			zap.S().Infof("Client(%d): Recv from client error: %s", ctx.Pid, err.Error())
			/* 做一些清理工作，把此后端连接上的事务回滚掉*/
			if ctx.transState != 'I' {
				CleanupTrans(ctx.pBackConn.Conn)
			}
			if ctx.isGetBackend {
				zap.S().Debugf("Client(%d): release backend(%d)", ctx.Pid, ctx.pBackConn.Id)
				atomic.StoreInt32(&ctx.pBackConn.State, 0)
				ctx.isGetBackend = false
			}
			cliConn.Close()
			return
		}

		collector.ActiveFrontendConnectionsIncrease(pool.Conf.ID)
		collector.IncreaseTotalRequests(pool.Conf.ID)

		if ctx.recvBuf[0] == 'X' { /* 这是Terminate消息*/
			ctx.ProcessX()
			collector.ActiveFrontendConnectionsDecrease(pool.Conf.ID)
			return
		} else if ctx.recvBuf[0] == 'Q' { /* 这是 Query 简单查询 */
			err = ctx.ProcessQ()
			collector.IncreaseSimpleQueries(pool.Conf.ID)
			collector.ActiveFrontendConnectionsDecrease(pool.Conf.ID)
			if err != nil {
				return
			}
			continue
		} else if ctx.recvBuf[0] == 'P' || ctx.recvBuf[0] == 'B' || ctx.recvBuf[0] == 'D' {
			err = ctx.ProcessExtendedQuery()
			collector.IncreaseExtendedQueries(pool.Conf.ID)
			collector.ActiveFrontendConnectionsDecrease(pool.Conf.ID)
			if err != nil {
				return
			}
			continue
		} else if ctx.recvBuf[0] == 'C' { /* 这是Close消息 */
			err = ctx.ProcessC()
			collector.ActiveFrontendConnectionsDecrease(pool.Conf.ID)
			if err != nil {
				return
			}
			continue
		} else {
			collector.ActiveFrontendConnectionsDecrease(pool.Conf.ID)
			zap.S().Panicf("unknown message(%c)", ctx.msgList[0][0])
		}
		//zap.S().Panicf("Client(%d): Unknown type(%c): %v", ctx.Pid, ctx.recvBuf[0], ctx.recvBuf[:ctx.recvLen])
	}
}
